Refactor caching of data for multilayer algorithms#118
Refactor caching of data for multilayer algorithms#118knoepfel merged 10 commits intoFramework-R-D:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Explores introducing a “repeater” concept for multi-layer consumption by refactoring how flush/caching signals are delivered through the flow graph, and adds a dedicated test harness to exercise the approach.
Changes:
- Added a new
test/repeatertest suite with anindex_routerandrepeater_nodeprototype to explore cached product reuse across layers. - Refactored flush propagation: flush messages are no longer routed through the
multiplexer; they are broadcast via dedicated flusher nodes and consumed through newflush_port()receivers on core node types. - Tightened several tests from “>=” to strict “==” execution-count expectations and exposed
layer_generator::layer_paths()for routing.
Reviewed changes
Copilot reviewed 27 out of 27 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
test/repeater/repeater_node.hpp |
Adds prototype repeater composite node with caching/flush-driven replay behavior. |
test/repeater/repeater.cpp |
Adds Catch2 test constructing a multi-layer graph using the repeater exploration nodes. |
test/repeater/nodes.hpp |
Adds provider/consumer and multi-argument consumer test nodes built around repeater nodes. |
test/repeater/message_types.hpp |
Defines test message types and matchers used by the repeater graph. |
test/repeater/index_router.hpp |
Declares index routing helper for broadcasting indices/flush tokens to multi-layer nodes. |
test/repeater/index_router.cpp |
Implements routing/backout logic and multilayer broadcasting. |
test/repeater/CMakeLists.txt |
Builds the repeater test and supporting index_router library. |
test/CMakeLists.txt |
Enables building the new test/repeater subdirectory. |
test/hierarchical_nodes.cpp |
Tightens execution count check for get_the_time. |
test/cached_execution.cpp |
Tightens cached execution checks from >= to ==. |
test/allowed_families.cpp |
Tightens provider execution count checks from >= to ==. |
plugins/layer_generator.hpp |
Exposes layer_paths() accessor for test routing. |
phlex/core/store_counters.hpp |
Adds detect_flush_flag::receive_flush(message const&) declaration to centralize flush handling. |
phlex/core/store_counters.cpp |
Implements receive_flush() and fixes map insertion logic for store_flag. |
phlex/core/multiplexer.cpp |
Stops routing flush messages through the multiplexer (now asserts non-flush only). |
phlex/core/message_sender.hpp |
Switches flush delivery dependency from multiplexer to flusher_t. |
phlex/core/message_sender.cpp |
Sends flush messages via flusher_t broadcast node instead of multiplexer. |
phlex/core/fwd.hpp |
Adds message fwd-decl and introduces flusher_t alias. |
phlex/core/framework_graph.hpp |
Adds a graph-wide flusher_ and wires message_sender to it. |
phlex/core/framework_graph.cpp |
Connects graph-wide and unfold-local flushers to each node’s new flush_port(). |
phlex/core/declared_unfold.hpp |
Adds flush_port(), per-unfold flusher(), and tracks child_layer on unfolds. |
phlex/core/declared_unfold.cpp |
Stores child_layer and uses parent-based flush store generation. |
phlex/core/declared_transform.hpp |
Adds flush_port() receiver node; main transform path now asserts non-flush. |
phlex/core/declared_provider.hpp |
Adds flush_port() receiver node; main provider path now asserts non-flush. |
phlex/core/declared_predicate.hpp |
Adds flush_port() receiver node; main predicate path now asserts non-flush. |
phlex/core/declared_observer.hpp |
Adds flush_port() receiver node; main observer path now asserts non-flush. |
phlex/core/declared_fold.hpp |
Adds flush_port() receiver node and removes flush handling from the main fold path. |
e62df3f to
b232e10
Compare
Codecov Report❌ Patch coverage is @@ Coverage Diff @@
## main #118 +/- ##
==========================================
+ Coverage 82.24% 83.22% +0.98%
==========================================
Files 127 129 +2
Lines 3103 3220 +117
Branches 547 546 -1
==========================================
+ Hits 2552 2680 +128
+ Misses 334 329 -5
+ Partials 217 211 -6
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 3 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
969a4ce to
d58bb4d
Compare
|
Review the full CodeQL report for details. |
6d12aff to
ebcd588
Compare
1be372f to
8ecc4f8
Compare
- Separate flush messages from regular stores - Consolidate flush reception - Use flush ports instead of conditional logic - Remove 'flush' stage
- Introduce multilayer_join_node - Add repeater_node test - Add DOT file that depicts caching in data flow
8ecc4f8 to
1df9a75
Compare
marcpaterno
left a comment
There was a problem hiding this comment.
Please see the comments attached to the code.
71d3f2f to
aacdc3f
Compare
All comments addressed by commit aacdc3f
Issue #24 describes the need to adjust the caching of data products. The chosen adjustment is described at #24 (comment) and explained here.
Caching performed by the framework graph
The Phlex framework implements caching to ensure that algorithms can operate on data products from correctly-associated data cells. The caching technology is based largely on oneTBB Flow Graph's
tbb::flow::join_node<...>template. This join node is used in tag-matching mode, where the input data on the join node's ports are emitted together when the "tags" corresponding to the data across the input ports match.To ensure that data from different data layers can be matched together, each data product (or, more properly, the product store that contains the data product) is included as part of a message that contains an ID. The message ID (an object of type
std::size_t) is inspected by thetbb::flow::join_node<...>to match data from the join node's ports.With the current implementation of oneTBB's Flow Graph, there is no way to tell a join node to (e.g.) reuse a particular data product from a run for a given set of spills within that run. The same data product must be repeatedly presented to the join node even though there may be many spills for one run. How this problem is solved depends on the implementation and is described below.
Current implementation
A workflow that demonstrates how this works with the present implementation is shown below.
The nodes
provide(A),transform(A), andobserve(C)should execute only once perrencountered. Theprovide(B),transform(B), andtransform(C, D)nodes should execute once per(r, s)encountered.In order to ensure that the$c_r$ must be presented to the join node the same number of times (and with the same message ID) as each of the data products $d_{rs}$ coming from $c_r$ can be re-presented to the join node. This is awkward for various reasons:
transform(C, D)can execute once per spill, the data producttransform(B). This is achieved by sending all of the spills to theprovide(A)node, reusing any data products already created by that node, and then passing them downstream until the data productprovide(A),transform(A), andobserve(C)do not conceptually operate on spill-level data products.These combined behaviors result in unpredictable results for the user.
With this PR
This PR localizes caching to only the nodes that require multiple inputs.
The benefits to this approach:
provide(A),transform(A), andobserve(C)operate only on run-level data products.The major changes with this PR
multiplexer("router" in earlier diagrams) has been replaced by anindex_routerclass (labeled "Index Router" in the above image)multilayer_join_nodeis created in such a manner that only the standardtbb::join_node<...>is used to tag-match the inputsmultilayer_join_nodeis created with "repeaters", which cache data products from the different layers while emitting them as needed to match the other inputs for the node.std::tuple<message>message type is no longer used for nodes that consume only one data product, thus removing unnecessary complexity and operations in presenting the data to nodes.